/* The latency monitor allows to easily observe the sources of latency
 * in an instance using the LATENCY command. Different latency
 * sources are monitored, like disk I/O, execution of commands, fork
 * system call, and so forth.
 *
 * ----------------------------------------------------------------------------
 *
 * Copyright (c) 2014, Salvatore Sanfilippo <antirez at gmail dot com>
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 *   * Redistributions of source code must retain the above copyright notice,
 *     this list of conditions and the following disclaimer.
 *   * Redistributions in binary form must reproduce the above copyright
 *     notice, this list of conditions and the following disclaimer in the
 *     documentation and/or other materials provided with the distribution.
 *   * Neither the name of Redis nor the names of its contributors may be used
 *     to endorse or promote products derived from this software without
 *     specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

#include "server.h"
#include "hdr_histogram.h"

/* Dictionary type for latency events. */
int dictStringKeyCompare(dict *d, const void *key1, const void *key2) {
    UNUSED(d);
    return strcmp(key1, key2) == 0;
}

uint64_t dictStringHash(const void *key) {
    return dictGenHashFunction(key, strlen(key));
}

dictType latencyTimeSeriesDictType = {
    dictStringHash,       /* hash function */
    NULL,                 /* key dup */
    dictStringKeyCompare, /* key compare */
    dictVanillaFree,      /* key destructor */
    dictVanillaFree,      /* val destructor */
    NULL                  /* allow to expand */
};

/* ------------------------- Utility functions ------------------------------ */

/* Report the amount of AnonHugePages in smap, in bytes. If the return
 * value of the function is non-zero, the process is being targeted by
 * THP support, and is likely to have memory usage / latency issues. */
int THPGetAnonHugePagesSize(void) {
    return zmalloc_get_smap_bytes_by_field("AnonHugePages:", -1);
}

/* ---------------------------- Latency API --------------------------------- */

/* Latency monitor initialization. We just need to create the dictionary
 * of time series, each time series is created on demand in order to avoid
 * having a fixed list to maintain. */
void latencyMonitorInit(void) {
    server.latency_events = dictCreate(&latencyTimeSeriesDictType);
}

/* Add the specified sample to the specified time series "event".
 * This function is usually called via latencyAddSampleIfNeeded(), that
 * is a macro that only adds the sample if the latency is higher than
 * server.latency_monitor_threshold. */
void latencyAddSample(const char *event, mstime_t latency) {
    struct latencyTimeSeries *ts = dictFetchValue(server.latency_events, event);
    time_t now = time(NULL);
    int prev;

    /* Create the time series if it does not exist. */
    if (ts == NULL) {
        ts = zmalloc(sizeof(*ts));
        ts->idx = 0;
        ts->max = 0;
        memset(ts->samples, 0, sizeof(ts->samples));
        dictAdd(server.latency_events, zstrdup(event), ts);
    }

    if (latency > ts->max) ts->max = latency;

    /* If the previous sample is in the same second, we update our old sample
     * if this latency is > of the old one, or just return. */
    prev = (ts->idx + LATENCY_TS_LEN - 1) % LATENCY_TS_LEN;
    if (ts->samples[prev].time == now) {
        if (latency > ts->samples[prev].latency) ts->samples[prev].latency = latency;
        return;
    }

    ts->samples[ts->idx].time = now;
    ts->samples[ts->idx].latency = latency;

    ts->idx++;
    if (ts->idx == LATENCY_TS_LEN) ts->idx = 0;
}

/* Reset data for the specified event, or all the events data if 'event' is
 * NULL.
 *
 * Note: this is O(N) even when event_to_reset is not NULL because makes
 * the code simpler and we have a small fixed max number of events. */
int latencyResetEvent(char *event_to_reset) {
    dictIterator *di;
    dictEntry *de;
    int resets = 0;

    di = dictGetSafeIterator(server.latency_events);
    while ((de = dictNext(di)) != NULL) {
        char *event = dictGetKey(de);

        if (event_to_reset == NULL || strcasecmp(event, event_to_reset) == 0) {
            dictDelete(server.latency_events, event);
            resets++;
        }
    }
    dictReleaseIterator(di);
    return resets;
}

/* ------------------------ Latency reporting (doctor) ---------------------- */

/* Analyze the samples available for a given event and return a structure
 * populate with different metrics, average, MAD, min, max, and so forth.
 * Check latency.h definition of struct latencyStats for more info.
 * If the specified event has no elements the structure is populate with
 * zero values. */
void analyzeLatencyForEvent(char *event, struct latencyStats *ls) {
    struct latencyTimeSeries *ts = dictFetchValue(server.latency_events, event);
    int j;
    uint64_t sum;

    ls->all_time_high = ts ? ts->max : 0;
    ls->avg = 0;
    ls->min = 0;
    ls->max = 0;
    ls->mad = 0;
    ls->samples = 0;
    ls->period = 0;
    if (!ts) return;

    /* First pass, populate everything but the MAD. */
    sum = 0;
    for (j = 0; j < LATENCY_TS_LEN; j++) {
        if (ts->samples[j].time == 0) continue;
        ls->samples++;
        if (ls->samples == 1) {
            ls->min = ls->max = ts->samples[j].latency;
        } else {
            if (ls->min > ts->samples[j].latency) ls->min = ts->samples[j].latency;
            if (ls->max < ts->samples[j].latency) ls->max = ts->samples[j].latency;
        }
        sum += ts->samples[j].latency;

        /* Track the oldest event time in ls->period. */
        if (ls->period == 0 || ts->samples[j].time < ls->period) ls->period = ts->samples[j].time;
    }

    /* So far avg is actually the sum of the latencies, and period is
     * the oldest event time. We need to make the first an average and
     * the second a range of seconds. */
    if (ls->samples) {
        ls->avg = sum / ls->samples;
        ls->period = time(NULL) - ls->period;
        if (ls->period == 0) ls->period = 1;
    }

    /* Second pass, compute MAD. */
    sum = 0;
    for (j = 0; j < LATENCY_TS_LEN; j++) {
        int64_t delta;

        if (ts->samples[j].time == 0) continue;
        delta = (int64_t)ls->avg - ts->samples[j].latency;
        if (delta < 0) delta = -delta;
        sum += delta;
    }
    if (ls->samples) ls->mad = sum / ls->samples;
}

/* Create a human readable report of latency events for this instance. */
sds createLatencyReport(void) {
    sds report = sdsempty();
    int advise_better_vm = 0;          /* Better virtual machines. */
    int advise_slowlog_enabled = 0;    /* Enable slowlog. */
    int advise_slowlog_tuning = 0;     /* Reconfigure slowlog. */
    int advise_slowlog_inspect = 0;    /* Check your slowlog. */
    int advise_disk_contention = 0;    /* Try to lower disk contention. */
    int advise_scheduler = 0;          /* Intrinsic latency. */
    int advise_data_writeback = 0;     /* data=writeback. */
    int advise_no_appendfsync = 0;     /* don't fsync during rewrites. */
    int advise_local_disk = 0;         /* Avoid remote disks. */
    int advise_ssd = 0;                /* Use an SSD drive. */
    int advise_write_load_info = 0;    /* Print info about AOF and write load. */
    int advise_hz = 0;                 /* Use higher HZ. */
    int advise_large_objects = 0;      /* Deletion of large objects. */
    int advise_mass_eviction = 0;      /* Avoid mass eviction of keys. */
    int advise_relax_fsync_policy = 0; /* appendfsync always is slow. */
    int advise_disable_thp = 0;        /* AnonHugePages detected. */
    int advices = 0;

    /* Return ASAP if the latency engine is disabled and it looks like it
     * was never enabled so far. */
    if (dictSize(server.latency_events) == 0 && server.latency_monitor_threshold == 0) {
        report = sdscat(
            report, "I'm sorry, Dave, I can't do that. Latency monitoring is disabled in this Valkey instance. You may "
                    "use \"CONFIG SET latency-monitor-threshold <milliseconds>.\" in order to enable it.\n");
        return report;
    }

    /* Show all the events stats and add for each event some event-related
     * comment depending on the values. */
    dictIterator *di;
    dictEntry *de;
    int eventnum = 0;

    di = dictGetSafeIterator(server.latency_events);
    while ((de = dictNext(di)) != NULL) {
        char *event = dictGetKey(de);
        struct latencyTimeSeries *ts = dictGetVal(de);
        struct latencyStats ls;

        if (ts == NULL) continue;
        eventnum++;
        if (eventnum == 1) {
            report = sdscat(report, "Latency spikes are observed in this Valkey instance.\n\n");
        }
        analyzeLatencyForEvent(event, &ls);

        report = sdscatprintf(report,
                              "%d. %s: %d latency spikes (average %lums, mean deviation %lums, period %.2f sec). Worst "
                              "all time event %lums.",
                              eventnum, event, ls.samples, (unsigned long)ls.avg, (unsigned long)ls.mad,
                              (double)ls.period / ls.samples, (unsigned long)ts->max);

        /* Fork */
        if (!strcasecmp(event, "fork")) {
            char *fork_quality;
            if (server.stat_fork_rate < 10) {
                fork_quality = "terrible";
                advise_better_vm = 1;
                advices++;
            } else if (server.stat_fork_rate < 25) {
                fork_quality = "poor";
                advise_better_vm = 1;
                advices++;
            } else if (server.stat_fork_rate < 100) {
                fork_quality = "good";
            } else {
                fork_quality = "excellent";
            }
            report = sdscatprintf(report, " Fork rate is %.2f GB/sec (%s).", server.stat_fork_rate, fork_quality);
        }

        /* Potentially commands. */
        if (!strcasecmp(event, "command")) {
            if (server.slowlog_log_slower_than < 0 || server.slowlog_max_len == 0) {
                advise_slowlog_enabled = 1;
                advices++;
            } else if (server.slowlog_log_slower_than / 1000 > server.latency_monitor_threshold) {
                advise_slowlog_tuning = 1;
                advices++;
            }
            advise_slowlog_inspect = 1;
            advise_large_objects = 1;
            advices += 2;
        }

        /* fast-command. */
        if (!strcasecmp(event, "fast-command")) {
            advise_scheduler = 1;
            advices++;
        }

        /* AOF and I/O. */
        if (!strcasecmp(event, "aof-write-pending-fsync")) {
            advise_local_disk = 1;
            advise_disk_contention = 1;
            advise_ssd = 1;
            advise_data_writeback = 1;
            advices += 4;
        }

        if (!strcasecmp(event, "aof-write-active-child")) {
            advise_no_appendfsync = 1;
            advise_data_writeback = 1;
            advise_ssd = 1;
            advices += 3;
        }

        if (!strcasecmp(event, "aof-write-alone")) {
            advise_local_disk = 1;
            advise_data_writeback = 1;
            advise_ssd = 1;
            advices += 3;
        }

        if (!strcasecmp(event, "aof-fsync-always")) {
            advise_relax_fsync_policy = 1;
            advices++;
        }

        if (!strcasecmp(event, "aof-fstat") || !strcasecmp(event, "rdb-unlink-temp-file")) {
            advise_disk_contention = 1;
            advise_local_disk = 1;
            advices += 2;
        }

        if (!strcasecmp(event, "aof-rewrite-diff-write") || !strcasecmp(event, "aof-rename")) {
            advise_write_load_info = 1;
            advise_data_writeback = 1;
            advise_ssd = 1;
            advise_local_disk = 1;
            advices += 4;
        }

        /* Expire cycle. */
        if (!strcasecmp(event, "expire-cycle")) {
            advise_hz = 1;
            advise_large_objects = 1;
            advices += 2;
        }

        /* Eviction cycle. */
        if (!strcasecmp(event, "eviction-del")) {
            advise_large_objects = 1;
            advices++;
        }

        if (!strcasecmp(event, "eviction-cycle")) {
            advise_mass_eviction = 1;
            advices++;
        }

        report = sdscatlen(report, "\n", 1);
    }
    dictReleaseIterator(di);

    /* Add non event based advices. */
    if (THPGetAnonHugePagesSize() > 0) {
        advise_disable_thp = 1;
        advices++;
    }

    if (eventnum == 0 && advices == 0) {
        report = sdscat(report, "No latency spike was observed during the lifetime of this Valkey instance, not "
                                "in the slightest bit.\n");
    } else if (eventnum > 0 && advices == 0) {
        report = sdscat(report, "\nThere are latency events logged that are not easy to fix. Please get some "
                                "help from Valkey community, providing this report in your help request.\n");
    } else {
        /* Add all the suggestions accumulated so far. */

        /* Better VM. */
        report = sdscat(report, "\nHere is some advice for you:\n\n");
        if (advise_better_vm) {
            report = sdscat(report, "- If you are using a virtual machine, consider upgrading it with a faster one "
                                    "using a hypervisior that provides less latency during fork() calls. Xen is known "
                                    "to have poor fork() performance. Even in the context of the same VM provider, "
                                    "certain kinds of instances can execute fork faster than others.\n");
        }

        /* Slow log. */
        if (advise_slowlog_enabled) {
            report =
                sdscatprintf(report,
                             "- There are latency issues with potentially slow commands you are using. Try to enable "
                             "the Slow Log Valkey feature using the command 'CONFIG SET slowlog-log-slower-than %llu'. "
                             "If the Slow log is disabled Valkey is not able to log slow commands execution for you.\n",
                             (unsigned long long)server.latency_monitor_threshold * 1000);
        }

        if (advise_slowlog_tuning) {
            report = sdscatprintf(
                report,
                "- Your current Slow Log configuration only logs events that are slower than your configured latency "
                "monitor threshold. Please use 'CONFIG SET slowlog-log-slower-than %llu'.\n",
                (unsigned long long)server.latency_monitor_threshold * 1000);
        }

        if (advise_slowlog_inspect) {
            report = sdscat(report,
                            "- Check your Slow Log to understand what are the commands you are running which are too "
                            "slow to execute. Please check https://valkey.io/commands/slowlog for more information.\n");
        }

        /* Intrinsic latency. */
        if (advise_scheduler) {
            report = sdscat(
                report,
                "- The system is slow to execute Valkey code paths not containing system calls. This usually means the "
                "system does not provide Valkey CPU time to run for long periods. You should try to:\n"
                "  1) Lower the system load.\n"
                "  2) Use a computer / VM just for Valkey if you are running other software in the same system.\n"
                "  3) Check if you have a \"noisy neighbour\" problem.\n"
                "  4) Check with 'valkey-cli --intrinsic-latency 100' what is the intrinsic latency in your system.\n"
                "  5) Check if the problem is allocator-related by recompiling Valkey with MALLOC=libc, if you are "
                "using Jemalloc. However this may create fragmentation problems.\n");
        }

        /* AOF / Disk latency. */
        if (advise_local_disk) {
            report = sdscat(report,
                            "- It is strongly advised to use local disks for persistence, especially if you are using "
                            "AOF. Remote disks provided by platform-as-a-service providers are known to be slow.\n");
        }

        if (advise_ssd) {
            report = sdscat(report,
                            "- SSD disks are able to reduce fsync latency, and total time needed for snapshotting "
                            "and AOF log rewriting (resulting in smaller memory usage). With extremely high write "
                            "load SSD disks can be a good option. However Valkey should perform reasonably with high "
                            "load using normal disks. Use this advice as a last resort.\n");
        }

        if (advise_data_writeback) {
            report =
                sdscat(report, "- Mounting ext3/4 filesystems with data=writeback can provide a performance boost "
                               "compared to data=ordered, however this mode of operation provides less guarantees, and "
                               "sometimes it can happen that after a hard crash the AOF file will have a half-written "
                               "command at the end and will require to be repaired before Valkey restarts.\n");
        }

        if (advise_disk_contention) {
            report = sdscat(report, "- Try to lower the disk contention. This is often caused by other disk intensive "
                                    "processes running in the same computer (including other Valkey instances).\n");
        }

        if (advise_no_appendfsync) {
            report = sdscat(report, "- Assuming from the point of view of data safety this is viable in your "
                                    "environment, you could try to enable the 'no-appendfsync-on-rewrite' option, so "
                                    "that fsync will not be performed while there is a child rewriting the AOF file or "
                                    "producing an RDB file (the moment where there is high disk contention).\n");
        }

        if (advise_relax_fsync_policy && server.aof_fsync == AOF_FSYNC_ALWAYS) {
            report = sdscat(report, "- Your fsync policy is set to 'always'. It is very hard to get good performances "
                                    "with such a setup, if possible try to relax the fsync policy to 'onesec'.\n");
        }

        if (advise_write_load_info) {
            report = sdscat(
                report, "- Latency during the AOF atomic rename operation or when the final difference is flushed to "
                        "the AOF file at the end of the rewrite, sometimes is caused by very high write load, causing "
                        "the AOF buffer to get very large. If possible try to send less commands to accomplish the "
                        "same work, or use Lua scripts to group multiple operations into a single EVALSHA call.\n");
        }

        if (advise_hz && server.hz < 100) {
            report = sdscat(report, "- In order to make the Valkey keys expiring process more incremental, try to set "
                                    "the 'hz' configuration parameter to 100 using 'CONFIG SET hz 100'.\n");
        }

        if (advise_large_objects) {
            report =
                sdscat(report, "- Deleting, expiring or evicting (because of maxmemory policy) large objects is a "
                               "blocking operation. If you have very large objects that are often deleted, expired, or "
                               "evicted, try to fragment those objects into multiple smaller objects.\n");
        }

        if (advise_mass_eviction) {
            report = sdscat(report, "- Sudden changes to the 'maxmemory' setting via 'CONFIG SET', or allocation of "
                                    "large objects via sets or sorted sets intersections, STORE option of SORT, Valkey "
                                    "Cluster large keys migrations (RESTORE command), may create sudden memory "
                                    "pressure forcing the server to block trying to evict keys. \n");
        }

        if (advise_disable_thp) {
            report =
                sdscat(report, "- I detected a non zero amount of anonymous huge pages used by your process. This "
                               "creates very serious latency events in different conditions, especially when "
                               "Valkey is persisting on disk. To disable THP support use the command 'echo never > "
                               "/sys/kernel/mm/transparent_hugepage/enabled', make sure to also add it into "
                               "/etc/rc.local so that the command will be executed again after a reboot. Note "
                               "that even if you have already disabled THP, you still need to restart the Valkey "
                               "process to get rid of the huge pages already created.\n");
        }
    }

    return report;
}

/* ---------------------- Latency command implementation -------------------- */

/* latencyCommand() helper to produce a map of time buckets,
 * each representing a latency range,
 * between 1 nanosecond and roughly 1 second.
 * Each bucket covers twice the previous bucket's range.
 * Empty buckets are not printed.
 * Everything above 1 sec is considered +Inf.
 * At max there will be log2(1000000000)=30 buckets */
void fillCommandCDF(client *c, struct hdr_histogram *histogram) {
    addReplyMapLen(c, 2);
    addReplyBulkCString(c, "calls");
    addReplyLongLong(c, (long long)histogram->total_count);
    addReplyBulkCString(c, "histogram_usec");
    void *replylen = addReplyDeferredLen(c);
    int samples = 0;
    struct hdr_iter iter;
    hdr_iter_log_init(&iter, histogram, 1024, 2);
    int64_t previous_count = 0;
    while (hdr_iter_next(&iter)) {
        const int64_t micros = iter.highest_equivalent_value / 1000;
        const int64_t cumulative_count = iter.cumulative_count;
        if (cumulative_count > previous_count) {
            addReplyLongLong(c, (long long)micros);
            addReplyLongLong(c, (long long)cumulative_count);
            samples++;
        }
        previous_count = cumulative_count;
    }
    setDeferredMapLen(c, replylen, samples);
}

/* latencyCommand() helper to produce for all commands,
 * a per command cumulative distribution of latencies. */
void latencyAllCommandsFillCDF(client *c, dict *commands, int *command_with_data) {
    dictIterator *di = dictGetSafeIterator(commands);
    dictEntry *de;
    struct serverCommand *cmd;

    while ((de = dictNext(di)) != NULL) {
        cmd = (struct serverCommand *)dictGetVal(de);
        if (cmd->latency_histogram) {
            addReplyBulkCBuffer(c, cmd->fullname, sdslen(cmd->fullname));
            fillCommandCDF(c, cmd->latency_histogram);
            (*command_with_data)++;
        }

        if (cmd->subcommands) {
            latencyAllCommandsFillCDF(c, cmd->subcommands_dict, command_with_data);
        }
    }
    dictReleaseIterator(di);
}

/* latencyCommand() helper to produce for a specific command set,
 * a per command cumulative distribution of latencies. */
void latencySpecificCommandsFillCDF(client *c) {
    void *replylen = addReplyDeferredLen(c);
    int command_with_data = 0;
    for (int j = 2; j < c->argc; j++) {
        struct serverCommand *cmd = lookupCommandBySds(c->argv[j]->ptr);
        /* If the command does not exist we skip the reply */
        if (cmd == NULL) {
            continue;
        }

        if (cmd->latency_histogram) {
            addReplyBulkCBuffer(c, cmd->fullname, sdslen(cmd->fullname));
            fillCommandCDF(c, cmd->latency_histogram);
            command_with_data++;
        }

        if (cmd->subcommands_dict) {
            dictEntry *de;
            dictIterator *di = dictGetSafeIterator(cmd->subcommands_dict);

            while ((de = dictNext(di)) != NULL) {
                struct serverCommand *sub = dictGetVal(de);
                if (sub->latency_histogram) {
                    addReplyBulkCBuffer(c, sub->fullname, sdslen(sub->fullname));
                    fillCommandCDF(c, sub->latency_histogram);
                    command_with_data++;
                }
            }
            dictReleaseIterator(di);
        }
    }
    setDeferredMapLen(c, replylen, command_with_data);
}

/* latencyCommand() helper to produce a time-delay reply for all the samples
 * in memory for the specified time series. */
void latencyCommandReplyWithSamples(client *c, struct latencyTimeSeries *ts) {
    void *replylen = addReplyDeferredLen(c);
    int samples = 0, j;

    for (j = 0; j < LATENCY_TS_LEN; j++) {
        int i = (ts->idx + j) % LATENCY_TS_LEN;

        if (ts->samples[i].time == 0) continue;
        addReplyArrayLen(c, 2);
        addReplyLongLong(c, ts->samples[i].time);
        addReplyLongLong(c, ts->samples[i].latency);
        samples++;
    }
    setDeferredArrayLen(c, replylen, samples);
}

/* latencyCommand() helper to produce the reply for the LATEST subcommand,
 * listing the last latency sample for every event type registered so far. */
void latencyCommandReplyWithLatestEvents(client *c) {
    dictIterator *di;
    dictEntry *de;

    addReplyArrayLen(c, dictSize(server.latency_events));
    di = dictGetIterator(server.latency_events);
    while ((de = dictNext(di)) != NULL) {
        char *event = dictGetKey(de);
        struct latencyTimeSeries *ts = dictGetVal(de);
        int last = (ts->idx + LATENCY_TS_LEN - 1) % LATENCY_TS_LEN;

        addReplyArrayLen(c, 4);
        addReplyBulkCString(c, event);
        addReplyLongLong(c, ts->samples[last].time);
        addReplyLongLong(c, ts->samples[last].latency);
        addReplyLongLong(c, ts->max);
    }
    dictReleaseIterator(di);
}

#define LATENCY_GRAPH_COLS 80
sds latencyCommandGenSparkeline(char *event, struct latencyTimeSeries *ts) {
    int j;
    struct sequence *seq = createSparklineSequence();
    sds graph = sdsempty();
    uint32_t min = 0, max = 0;

    for (j = 0; j < LATENCY_TS_LEN; j++) {
        int i = (ts->idx + j) % LATENCY_TS_LEN;
        int elapsed;
        char buf[64];

        if (ts->samples[i].time == 0) continue;
        /* Update min and max. */
        if (seq->length == 0) {
            min = max = ts->samples[i].latency;
        } else {
            if (ts->samples[i].latency > max) max = ts->samples[i].latency;
            if (ts->samples[i].latency < min) min = ts->samples[i].latency;
        }
        /* Use as label the number of seconds / minutes / hours / days
         * ago the event happened. */
        elapsed = time(NULL) - ts->samples[i].time;
        if (elapsed < 60)
            snprintf(buf, sizeof(buf), "%ds", elapsed);
        else if (elapsed < 3600)
            snprintf(buf, sizeof(buf), "%dm", elapsed / 60);
        else if (elapsed < 3600 * 24)
            snprintf(buf, sizeof(buf), "%dh", elapsed / 3600);
        else
            snprintf(buf, sizeof(buf), "%dd", elapsed / (3600 * 24));
        sparklineSequenceAddSample(seq, ts->samples[i].latency, buf);
    }

    graph = sdscatprintf(graph, "%s - high %lu ms, low %lu ms (all time high %lu ms)\n", event, (unsigned long)max,
                         (unsigned long)min, (unsigned long)ts->max);
    for (j = 0; j < LATENCY_GRAPH_COLS; j++) graph = sdscatlen(graph, "-", 1);
    graph = sdscatlen(graph, "\n", 1);
    graph = sparklineRender(graph, seq, LATENCY_GRAPH_COLS, 4, SPARKLINE_FILL);
    freeSparklineSequence(seq);
    return graph;
}

/* LATENCY command implementations.
 *
 * LATENCY HISTORY: return time-latency samples for the specified event.
 * LATENCY LATEST: return the latest latency for all the events classes.
 * LATENCY DOCTOR: returns a human readable analysis of instance latency.
 * LATENCY GRAPH: provide an ASCII graph of the latency of the specified event.
 * LATENCY RESET: reset data of a specified event or all the data if no event provided.
 * LATENCY HISTOGRAM: return a cumulative distribution of latencies in the format of an histogram for the specified
 * command names.
 */
void latencyCommand(client *c) {
    struct latencyTimeSeries *ts;

    if (!strcasecmp(c->argv[1]->ptr, "history") && c->argc == 3) {
        /* LATENCY HISTORY <event> */
        ts = dictFetchValue(server.latency_events, c->argv[2]->ptr);
        if (ts == NULL) {
            addReplyArrayLen(c, 0);
        } else {
            latencyCommandReplyWithSamples(c, ts);
        }
    } else if (!strcasecmp(c->argv[1]->ptr, "graph") && c->argc == 3) {
        /* LATENCY GRAPH <event> */
        sds graph;
        dictEntry *de;
        char *event;

        de = dictFind(server.latency_events, c->argv[2]->ptr);
        if (de == NULL) goto nodataerr;
        ts = dictGetVal(de);
        event = dictGetKey(de);

        graph = latencyCommandGenSparkeline(event, ts);
        addReplyVerbatim(c, graph, sdslen(graph), "txt");
        sdsfree(graph);
    } else if (!strcasecmp(c->argv[1]->ptr, "latest") && c->argc == 2) {
        /* LATENCY LATEST */
        latencyCommandReplyWithLatestEvents(c);
    } else if (!strcasecmp(c->argv[1]->ptr, "doctor") && c->argc == 2) {
        /* LATENCY DOCTOR */
        sds report = createLatencyReport();

        addReplyVerbatim(c, report, sdslen(report), "txt");
        sdsfree(report);
    } else if (!strcasecmp(c->argv[1]->ptr, "reset") && c->argc >= 2) {
        /* LATENCY RESET */
        if (c->argc == 2) {
            addReplyLongLong(c, latencyResetEvent(NULL));
        } else {
            int j, resets = 0;

            for (j = 2; j < c->argc; j++) resets += latencyResetEvent(c->argv[j]->ptr);
            addReplyLongLong(c, resets);
        }
    } else if (!strcasecmp(c->argv[1]->ptr, "histogram") && c->argc >= 2) {
        /* LATENCY HISTOGRAM*/
        if (c->argc == 2) {
            int command_with_data = 0;
            void *replylen = addReplyDeferredLen(c);
            latencyAllCommandsFillCDF(c, server.commands, &command_with_data);
            setDeferredMapLen(c, replylen, command_with_data);
        } else {
            latencySpecificCommandsFillCDF(c);
        }
    } else if (!strcasecmp(c->argv[1]->ptr, "help") && c->argc == 2) {
        /* clang-format off */
        const char *help[] = {
"DOCTOR",
"    Return a human readable latency analysis report.",
"GRAPH <event>",
"    Return an ASCII latency graph for the <event> class.",
"HISTORY <event>",
"    Return time-latency samples for the <event> class.",
"LATEST",
"    Return the latest latency samples for all events.",
"RESET [<event> ...]",
"    Reset latency data of one or more <event> classes.",
"    (default: reset all data for all event classes)",
"HISTOGRAM [COMMAND ...]",
"    Return a cumulative distribution of latencies in the format of a histogram for the specified command names.",
"    If no commands are specified then all histograms are replied.",
NULL
        };
        /* clang-format on */
        addReplyHelp(c, help);
    } else {
        addReplySubcommandSyntaxError(c);
    }
    return;

nodataerr:
    /* Common error when the user asks for an event we have no latency
     * information about. */
    addReplyErrorFormat(c, "No samples available for event '%s'", (char *)c->argv[2]->ptr);
}

void durationAddSample(int type, monotime duration) {
    if (type >= EL_DURATION_TYPE_NUM) {
        return;
    }
    durationStats *ds = &server.duration_stats[type];
    ds->cnt++;
    ds->sum += duration;
    if (duration > ds->max) {
        ds->max = duration;
    }
}
